Skip to content

Conversation

@chenmoneygithub
Copy link
Collaborator

Previously we limit stream listeners to only work with str field and some prebuilt types. This PR lifts up the constraint, and allow streaming on any types. Here is the gist:

  • StreamLIstener works the same way as before, i.e., only capture streaming chunks associated with a certain field.
  • We don't perform additional handling on the structured output value for each field, but just give back the value.

To accomodate JSONAdapter, which doesn't have clear boilerplate like [[ ## answer ## ]] that splits the fields' streaming in ChatAdapter, we make specific logic chunk based on jiter to handle it.

@chenmoneygithub chenmoneygithub marked this pull request as draft September 20, 2025 01:07
@chenmoneygithub chenmoneygithub marked this pull request as ready for review September 20, 2025 22:44
# Other adapters rely on the end_identifier to detect the end of the field we are listening to.
return self._default_handle_stream_chunk(token, end_identifier)

def _json_adapter_handle_stream_chunk(self, token: str, chunk_message: str) -> str:
Copy link
Collaborator

@TomeHirata TomeHirata Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return signature should be StreamResponse | None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching it!

is_last_chunk=self.stream_end,
)

def _default_handle_stream_chunk(self, token: str, end_identifier: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty!

elif self.field_end_queue.qsize() > 10:
# Buffer could form end identifier, but we've exceeded max buffer size
# Yield the oldest token to prevent unbounded buffering
# We keep the last 10 tokens in the buffer to avoid sending the DSPy bolilerplate tokens to users.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's note that we buffer only if tokens in the queue can form the boilerplate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg!

return StreamResponse(self.predict_name, self.signature_field_name, token, is_last_chunk=self.stream_end)

try:
parsed = jiter.from_json(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting, can't we just count the number of { and }?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, please see the new implementation for a more robust solution.

@TomeHirata TomeHirata requested a review from Copilot November 4, 2025 08:40
self.cache_hit = False
self.allow_reuse = allow_reuse

self.json_adapter_state = {"field_accumulated_tokens": ""}
Copy link
Collaborator

@TomeHirata TomeHirata Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to introduce other keys to self.json_adapter_state? or can we flatten the structure?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for streaming Pydantic models in JSONAdapter by implementing partial JSON parsing for field detection. Previously, streaming was only supported for string and dspy.Type fields.

  • Removed the _is_streamable validation check to allow Pydantic models to be streamed
  • Implemented JSONAdapter-specific streaming logic using the jiter library for partial JSON parsing
  • Updated test data to correctly format JSON string values with quotes

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
tests/streaming/test_streaming.py Added comprehensive test coverage for Pydantic model streaming with ChatAdapter and JSONAdapter; corrected test assertions to include proper JSON quote formatting
dspy/streaming/streaming_listener.py Refactored streaming logic to support Pydantic models via partial JSON parsing; removed streamability validation that blocked Pydantic models
Comments suppressed due to low confidence (1)

tests/streaming/test_streaming.py:1122

  • The removed assertion at line 1128 assert final_prediction.answer == \"According to the references, water boils at 100°C.\" is no longer present, which means the final prediction answer field is no longer validated. This reduces test coverage for the citations streaming test.
            assert "".join(answer_chunks) == "According to the references, water boils at 100°C."

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

# Other adapters rely on the end_identifier to detect the end of the field we are listening to.
return self._default_handle_stream_chunk(token, end_identifier)

def _json_adapter_handle_stream_chunk(self, token: str, chunk_message: str) -> StreamResponse | None:
Copy link

Copilot AI Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixing implicit and explicit returns may indicate an error, as implicit returns always return None.

Copilot uses AI. Check for mistakes.
return self._default_handle_stream_chunk(token, end_identifier)

def _json_adapter_handle_stream_chunk(self, token: str, chunk_message: str) -> StreamResponse | None:
self.json_adapter_state["field_accumulated_tokens"] += chunk_message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we accumulate chunk_message instead of token?

Copy link
Collaborator Author

@chenmoneygithub chenmoneygithub Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field_accumulated_tokens is probably a bad name, renaming to field_accumulated_messages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically token is the one we return, chunk_message is the new message we are receiving.

is_last_chunk=self.stream_end,
self.predict_name, self.signature_field_name, token, is_last_chunk=self.stream_end
)
except Exception:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we limit this to be ValueError?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good call!

self.stream_end = True
last_token = self.flush()

keys = list(parsed.keys())
Copy link
Collaborator

@TomeHirata TomeHirata Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is parsed.keys ordered based on the key order in the json string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. It shouldn't affect the logic here though, we just need the key name to cut off the extra characters.

if field_type is str:
return True
if issubclass(field_type, Type):
return field_type.is_streamable()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we delete is_streamable method of Type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good question, I did think about it, and my take is that's still useful to allow streaming listener to hit this part on certain-type fields like Citation. For custom type that's not streamable, it will use the normal streaming handling.

        # Handle custom streamable types
        if self._output_type and issubclass(self._output_type, Type) and self._output_type.is_streamable():
            if parsed_chunk := self._output_type.parse_stream_chunk(chunk):
                return StreamResponse(
                    self.predict_name,
                    self.signature_field_name,
                    parsed_chunk,
                    is_last_chunk=self.stream_end,
                )

self.signature_field_name,
token,
is_last_chunk=self.stream_end,
self.predict_name, self.signature_field_name, token, is_last_chunk=self.stream_end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So overall we will return a raw string chunk so the deserialization needs to happen on the caller side?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! It should be pretty simple for the caller to accumulate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants